package com.meicai.data.engine.distribute;

import com.meicai.data.Main;
import com.meicai.data.engine.Engine;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * MR方式
 */
public class MREngine implements Engine {

    @Override
    public void execute() throws Exception {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "SkuBatchOverage");
        job.setJarByClass(Main.class);

        job.setMapperClass(SkuMapper.class);
        job.setReducerClass(SkuReducer.class);
        job.setPartitionerClass(SecondarySortApi.SkuPartitioner.class);
        job.setGroupingComparatorClass(SecondarySortApi.SkuGroupingComparator.class);

        job.setMapOutputKeyClass(SecondarySortApi.SkuOutputKey.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);
        job.setNumReduceTasks(5);

        FileInputFormat.addInputPath(job, new Path("/user/hive/warehouse/ods.db/ods_wms_loc_stock_record"));

//        MultipleOutputs.addNamedOutput(job, OPERATE_DETAIL_OUTPUT, TextOutputFormat.class, NullWritable.class, Text.class);
//        MultipleOutputs.addNamedOutput(job, OPERATE_COST_OUTPUT, TextOutputFormat.class, NullWritable.class, Text.class);
        Path path = new Path("/user/hive/warehouse/dw.db/dw_wms_loc_stock_overage");
        FileSystem fs = FileSystem.get(conf);
        if (fs.exists(path)) {
            fs.delete(path, true);
        }
        FileOutputFormat.setOutputPath(job, path);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }


}
